RESUMEN SIMULADOR DE INVERSIÓN DOMÉSTICA
Y MERCADOS FINANCIEROS

Un enfoque Big Data para la planificación Domésica con AWS y PowerBI mediante la proyección de metas financieras alcanzables y simulación de MonteCarlo en los mercados S&P500 y Bitcoin

Portada del Proyecto - Casa sobre monedas

Autor: Pablo Vidal Vidal
Reto 5 Big Data & IA
pablo2vbngdaw@gmail.com

Haz clic en un elemento del índice para ir directamente o usa los botones de navegación.

1. 🚀 INTRODUCCIÓN 🎯

1.1. Problema a Resolver y Solución Propuesta

💡 PROBLEMA A RESOLVER:

Muchas personas tienen metas financieras 🏡💰✈️ (comprar una casa, jubilación, un viaje) pero les cuesta visualizar si sus hábitos de ahorro e inversión actuales les permitirán alcanzarlas, especialmente considerando la volatilidad de los mercados 📉📈.

SOLUCIÓN:

Un simulador innovador que:

  • 🔍 Analiza el comportamiento de ahorro del usuario a partir de un extracto bancario.
  • 🎯 Permite al usuario definir una meta financiera, un plazo y una estrategia de inversión (distribuyendo su ahorro mensual entre efectivo 💵, S&P 500 📊 y Bitcoin ).
  • 🎰 Utiliza datos históricos de mercado para simular miles de posibles escenarios futuros mediante la potente técnica de Monte Carlo.
  • 🧮 Calcula la probabilidad de alcanzar la meta financiera definida.
  • 📊 Presenta los resultados de forma clara, visual e interactiva en Microsoft Power BI.

1.2. Tecnologías Utilizadas y Diagrama de Arquitectura

⚙️ TECNOLOGÍAS UTILIZADAS:

  • ☁️ Almacenamiento de Datos: Amazon S3
  • 🛠️ ETL (Extracción, Transformación, Carga): AWS Glue
  • 💻 Procesamiento y Simulación: Amazon EMR (con Apache Spark)
  • 📈 Visualización: Microsoft Power BI

🔗 DIAGRAMA DE ARQUITECTURA:

(Se incluirá un diagrama de flujo visualizando ➡️☁️S3 -> 🛠️Glue -> ☁️S3 -> 💻EMR -> ☁️S3 -> 📊Power BI, detallando los tipos de datos en cada transición).

Diagrama de Arquitectura del Proyecto Diagrama de Arquitectura General del Proyecto

2. ☁️PASO 1: PREPARACIÓN DEL ENTORNO Y DATOS EN S3 📂

🎯 OBJETIVO DEL PASO: Crear la estructura de carpetas en Amazon S3 y subir los datasets iniciales (extracto bancario del usuario y datos históricos de mercado).

🛠️ ACCIONES DETALLADAS:

2.1. Creación de Bucket y Estructura de Carpetas

1. Crear un Bucket S3:

  • Nombre del bucket: mi-simulador-inversion-pablo-vidal
Creación de Bucket S3 Imagen 1: Creación del Bucket S3

2. Crear Estructura de Carpetas dentro del Bucket:

Creamos las siguientes carpetas (en S3, las "carpetas" son prefijos de objeto):

  • 📁 datos-entrada/
  • 📊 datos-mercado/
  • ⚙️ procesado-glue/ (subcarpetas: detalle_movimientos_mensual/, promedios_gastos_categorias/, aporte_simulacion_base/, parametros_mercado/)
  • ⚙️ procesado-emr/
  • 🐍 scripts/
Estructura de Carpetas en S3 Imagen 2: Estructura de Carpetas en el Bucket S3
2.2. Generación y Carga de Datos

3. Preparar los Datos Locales:

  • Extracto Bancario (extracto_bancario.csv): Generado con script Python generarextractobancario.py.
  • Archivos en S3 datos-entrada Imagen 3: Archivo extracto_bancario.csv en S3
  • Datos Históricos S&P 500 (sp500_historico.csv): Obtenido de Yahoo Finance vía script generarcsvsp500.py.
  • Datos Históricos Bitcoin (bitcoin_historico.csv): Obtenido de Yahoo Finance vía script generarcsvbitcoin.py.

4. Subir los Archivos a S3 en sus respectivas carpetas.

📝 RESUMEN PASO 1:

  • Bucket y estructura de carpetas creados en S3.
  • Datos (extracto, S&P500, Bitcoin) generados y cargados a S3.
  • Scripts de generación de datos guardados en S3.

3. 🛠️PASO 2: AWS GLUE - ETL DEL EXTRACTO BANCARIO ⚙️

🎯 OBJETIVO PRINCIPAL DEL PASO: Procesar extracto_bancario.csv para categorizar transacciones, agregar montos mensuales, calcular totales y ahorros, y generar CSVs detallados y de aporte para EMR.

3.1. Objetivo y Acciones (Crawler, Job, Transformaciones SQL)

1. Crawler crawler_extracto_bancario (Imagen 4)

Crawler ExtractoImagen 4

2. Tabla en Data Catalog (Imagen 5)

Tabla ExtractoImagen 5

3. Job Glue Studio Job_Extracto_Bancario (Imagen 6)

Job ExtractoImagen 6

Transformación Transformar_Extracto_detallado (Imagen 7), Destino S3 (Imagen 8)

SQL Extracto DetalleImagen 7
Destino Extracto DetalleImagen 8

Transformación Calcular_Promedios_Gasto (Imagen 9), Destino S3 (Imagen 10)

SQL Promedios GastoImagen 9
Destino Promedios GastoImagen 10

Transformación Calcular_Aporte_Base_Simulacion (Imagen 11), Destino S3 (Imagen 12)

SQL Aporte BaseImagen 11
Destino Aporte BaseImagen 12

3.2. Salidas: Detalle Mensual, Promedios de Gasto, Aporte para Simulación

Se generan y guardan en S3: detalle_movimientos_categorizados_mensual.csv, promedios_gastos_categorias.csv, y aporte_mensual_simulacion_base.csv.

📝 RESUMEN PASO 2:

Job de Glue procesa extracto bancario generando: detalle mensual, promedios de gasto, y el aporte base para simulación, todo guardado en S3.

4. 📈PASO 3: AWS GLUE - ETL DE DATOS DE MERCADO ⚙️

🎯 OBJETIVO DEL PASO: Procesar datos históricos de S&P 500 y Bitcoin para calcular μ y σ para cada activo, que se usarán en la simulación de EMR.

4.1. Objetivo y Acciones (Crawlers, Job, Transformaciones SQL)

1. Crawlers crawler_sp500_historico y crawler_bitcoin_historico (Imagen 13)

Crawlers MercadoImagen 13

2. Tablas en Data Catalog (Imagen 14)

Tablas MercadoImagen 14

3. Job Glue Studio job_procesar_datos_mercado (Imagen 15)

Job Datos MercadoImagen 15

Flujo Bitcoin (Imagen 16), Unión Retornos (Imagen 17), Cálculo Estadísticas (Imagen 18), Destino S3 (Imagen 19)

SQL Retornos BitcoinImagen 16
SQL Unión RetornosImagen 17
SQL Estadísticas MercadoImagen 18
Destino Parámetros MercadoImagen 19

4.2. Salida: Parámetros de Mercado (μ y σ para S&P 500 y Bitcoin)

El archivo parametros_mercado.csv con los 4 valores estadísticos se guarda en S3.

📝 Resumen del Paso 3:

Job de Glue procesa datos de mercado, calcula retornos, los une y obtiene μ y σ para S&P 500 y Bitcoin, guardados en S3. 🚀

5. 🎰PASO 4: SIMULACIÓN MONTE CARLO CON AWS EMR 🚀

🎯 Objetivo: Utilizar Amazon EMR y Apache Spark para ejecutar miles de simulaciones de Monte Carlo, proyectando el crecimiento del capital del usuario y calculando la probabilidad de alcanzar su meta financiera.

5.1. Objetivo y Configuración del Entorno EMR (Clúster, Studio, Notebook)

1. Crear Clúster Clusterparamisimulador (Imagen 20)

Creación Clúster EMRImagen 20

2. Crear EMR Studio y Workspace (Imagen 21)

EMR Studio y WorkspaceImagen 21

3. Asociar Clúster y Lanzar Notebook (Imagen 22)

Lanzar Notebook EMRImagen 22

5.2. Desarrollo en Notebook PySpark:

5.2.1. Configuración y Carga de Datos (Celda 1) (Imagen 23)

EMR Notebook Celda 1Imagen 23

5.2.2. Ejecución de Simulaciones (Celda 2) (Imagen 24)

EMR Notebook Celda 2Imagen 24

5.2.3. Análisis y Almacenamiento de Resultados (Celda 3) (Imagen 25)

EMR Notebook Celda 3Imagen 25

🔑 Resultado Clave: ¡Archivos listos en S3 con toda la información que Power BI necesita! 📉📈✅

6. 💾PASO 5: COMPROBACIÓN DE RESULTADOS EN S3

Verificación de los archivos CSV generados en S3: resultados_simulacion_distribucion.csv, resultados_simulacion_proyeccion_mediana.csv, sumario_simulacion.csv.

Resultados EMR en S3 Imagen 26: Archivos CSV resultantes de la simulación en S3

7. 🎨PASO 6: VISUALIZACIÓN CON POWER BI ✨📊

🎯 Objetivo del Paso: Conectar Power BI a los datos en S3, importarlos, transformarlos si es necesario, y construir un dashboard interactivo con KPIs y gráficos.

7.1. Conexión, Importación y Transformación de Datos

Conectar Power BI a los CSVs en S3 y realizar transformaciones necesarias en Power Query (ej. delimitadores decimales).

7.2. Diseño del Dashboard (KPIs, Gráficos de Simulación e Históricos)

Vista 1: Simulación Personalizada (Imagen 27)

Power BI Dashboard Vista 1Imagen 27

Vista 2: Evolución Histórica de Inversiones (Imagen 28)

Power BI Dashboard Vista 2Imagen 28

8. 🏁CONCLUSIÓN DEL PROYECTO

Este proyecto ha sido un viaje fascinante a través del mundo del Big Data y la analítica financiera, culminando en la creación de un Simulador de Inversión Personalizado. Partiendo de la necesidad real de visualizar el camino hacia metas financieras, hemos construido una solución integral que aprovecha la potencia y flexibilidad del ecosistema AWS, junto con la capacidad de visualización de Microsoft Power BI.

8.1. Recorrido por la Solución, Valor y Aprendizaje

Un Recorrido por la Solución:

  1. ☁️ **Cimientos Sólidos en S3:** Establecimos nuestro data lake en Amazon S3, organizando los datos de entrada del usuario (extractos bancarios generados programáticamente) y los datos históricos de mercado (S&P 500 y Bitcoin, también obtenidos mediante scripts Python que interactúan con Yahoo Finance).
  2. 🛠️ **Transformación con AWS Glue:**
    • Procesamos los extractos bancarios para obtener un análisis detallado de ingresos, gastos por categoría, y, crucialmente, el aporte mensual base para la inversión.
    • Analizamos los datos históricos de mercado para calcular los parámetros estadísticos esenciales (retorno promedio mensual μ y volatilidad mensual σ) para el S&P 500 y Bitcoin.
    Estos procesos de ETL, diseñados en AWS Glue Studio, prepararon los datos de manera eficiente para la siguiente etapa.
  3. 🎰 **Simulación Potente con Amazon EMR y Spark:**
    • Configuramos un clúster de EMR y, utilizando un Notebook de Jupyter (PySpark), implementamos una simulación de Monte Carlo.
    • Este motor tomó el aporte mensual del usuario, los parámetros de mercado (μ y σ), y la estrategia de inversión definida para proyectar miles de posibles escenarios futuros de crecimiento del capital.
    • Calculamos la probabilidad de alcanzar la meta financiera y identificamos la trayectoria mediana para una visualización representativa. Los resultados se almacenaron de nuevo en S3, listos para el análisis.
  4. 📊 **Visualización Impactante con Power BI:**
    • Conectamos Power BI a los resultados en S3.
    • Creamos un dashboard que presenta de forma clara: Los KPIs clave (meta, plazo, probabilidad de éxito, capital mediano), un gráfico de proyección de la cartera mediana, ilustrando el poder del interés compuesto, y una vista de la evolución histórica de las inversiones elegidas (S&P 500 y Bitcoin).

Valor y Aprendizaje:

Este proyecto no solo demuestra la aplicación práctica de tecnologías de Big Data en un caso de uso financiero relevante, sino que también subraya la importancia de un flujo de datos bien orquestado, desde la ingesta y transformación hasta el procesamiento avanzado y la visualización final. La capacidad de integrar servicios como S3, Glue, EMR y Power BI abre un abanico de posibilidades para análisis complejos y toma de decisiones informadas.

8.2. Agradecimientos y Cierre

Ha sido un placer aprender durante el curso. Se nota que David tiene pasión por lo que hace y así lo transmite. Desarrollar este simulador, profundizando en cada una de las herramientas y superando los desafíos técnicos que surgieron. Ha sido difícil y he obviado alguna cosa pero la oportunidad de construir algo tan tangible y con potencial de ayudar a las personas a planificar su futuro financiero ha sido increíblemente gratificante.

Espero sinceramente que este proyecto y su presentación sean de su agrado y cumplan con las expectativas. Muchísimas gracias por la oportunidad de trabajar en este reto y por su guía a lo largo del proceso.

🚀 ¡Gracias! 🚀

9. 📎APÉNDICES

9.1. Scripts Python de Preparación de Ingesta

generarextractobanario.py

import pandas as pd
import random
from datetime import datetime, timedelta
import os

def generar_fecha_aleatoria_mes(year, month, dia_inicio=1, dia_fin=28):
    """Genera una fecha aleatoria dentro de un mes y rango de días."""
    start_date = datetime(year, month, dia_inicio)
    try:
        datetime(year, month, dia_fin)
        dia_fin_real = dia_fin
    except ValueError:
        end_of_month = (start_date.replace(day=1) + timedelta(days=32)).replace(day=1) - timedelta(days=1)
        dia_fin_real = end_of_month.day
    
    dia_inicio_real = min(dia_inicio, dia_fin_real)
    random_day = random.randint(dia_inicio_real, dia_fin_real)
    return datetime(year, month, random_day)

def generar_extracto_simulado(nombre_archivo_salida="extracto_bancario.csv", 
                            anos_a_generar=10, 
                            objetivo_saldo_final_mes_cuenta_corriente=20.0):
    nomina_base = 1800.00
    incremento_anual_nomina = 0.02
    paga_extra_base_ratio = 0.6
    hipoteca_base = -450.00
    incremento_anual_hipoteca = 0.015
    suministros_base = -110.00
    variacion_suministros = 20.00
    transferencia_ahorro_base = -300.00
    incremento_anual_transferencia = 0.02
    supermercado_rango = (-150, -90)
    restauracion_rango = (-60, -25)
    gastos_varios_rango = (-150, -40)
    vacaciones_gasto_base = -1500 
    transacciones = []
    ano_inicio_sim = datetime.now().year - anos_a_generar
    for anio_offset in range(anos_a_generar):
        ano_actual_sim = ano_inicio_sim + anio_offset
        nomina_anual = nomina_base * ((1 + incremento_anual_nomina) ** anio_offset)
        paga_extra_anual = nomina_anual * paga_extra_base_ratio
        hipoteca_anual = hipoteca_base * ((1 + incremento_anual_hipoteca) ** anio_offset)
        transferencia_ahorro_anual = transferencia_ahorro_base * ((1 + incremento_anual_transferencia) ** anio_offset)
        for mes_idx in range(1, 13):
            ingresos_este_mes = 0
            gastos_fijos_este_mes = 0 
            gastos_variables_este_mes_lista = [] 
            fecha_nomina = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 1, 5)
            transacciones.append([fecha_nomina.strftime('%Y-%m-%d'), "NOMINA", round(nomina_anual, 2)])
            ingresos_este_mes += nomina_anual
            if mes_idx == 6 or mes_idx == 12:
                fecha_paga_extra = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 15, 25)
                transacciones.append([fecha_paga_extra.strftime('%Y-%m-%d'), f"PAGA EXTRA {'VERANO' if mes_idx == 6 else 'NAVIDAD'}", round(paga_extra_anual, 2)])
                ingresos_este_mes += paga_extra_anual
            if random.random() < 0.1: 
                monto_ing_extra = round(random.uniform(50, 400), 2)
                fecha_ing_extra = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 10, 20)
                transacciones.append([fecha_ing_extra.strftime('%Y-%m-%d'), "INGRESO EXTRAORDINARIO", monto_ing_extra])
                ingresos_este_mes += monto_ing_extra
            fecha_hipoteca = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 1, 5)
            monto_hipoteca_actual = round(hipoteca_anual, 2)
            transacciones.append([fecha_hipoteca.strftime('%Y-%m-%d'), "HIPOTECA", monto_hipoteca_actual])
            gastos_fijos_este_mes += abs(monto_hipoteca_actual)
            fecha_suministros = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 5, 10)
            monto_suministros = round(suministros_base + random.uniform(-variacion_suministros, variacion_suministros), 2)
            monto_suministros = -abs(monto_suministros) if monto_suministros > 0 else monto_suministros
            transacciones.append([fecha_suministros.strftime('%Y-%m-%d'), "SUMINISTROS (LUZ, AGUA, GAS)", monto_suministros])
            gastos_fijos_este_mes += abs(monto_suministros)
            fecha_transferencia = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx, 25, 28)
            monto_transferencia_actual = round(transferencia_ahorro_anual, 2)
            monto_transferencia_actual = -abs(monto_transferencia_actual) if monto_transferencia_actual > 0 else monto_transferencia_actual
            transacciones.append([fecha_transferencia.strftime('%Y-%m-%d'), "TRANSFERENCIA AHORRO", monto_transferencia_actual])
            gastos_fijos_este_mes += abs(monto_transferencia_actual)
            for _ in range(random.randint(3, 5)):
                monto = round(random.uniform(supermercado_rango[0], supermercado_rango[1]), 2)
                gastos_variables_este_mes_lista.append(("SUPERMERCADO", monto))
            for _ in range(random.randint(4, 8)):
                monto = round(random.uniform(restauracion_rango[0], restauracion_rango[1]), 2)
                gastos_variables_este_mes_lista.append(("RESTAURACION", monto))
            for _ in range(random.randint(2, 5)):
                monto = round(random.uniform(gastos_varios_rango[0], gastos_varios_rango[1]), 2)
                gastos_variables_este_mes_lista.append(("GASTOS VARIOS (OCIO, TRANSPORTE, ETC)", monto))
            if (mes_idx == 7 or mes_idx == 8) and random.random() < 0.7: 
                monto_vacaciones = round(vacaciones_gasto_base * random.uniform(0.8, 1.5), 2)
                monto_vacaciones = -abs(monto_vacaciones) if monto_vacaciones > 0 else monto_vacaciones
                gastos_variables_este_mes_lista.append(("GASTO VACACIONES", monto_vacaciones))
            total_gastos_variables_original = sum(abs(g[1]) for g in gastos_variables_este_mes_lista)
            margen_disponible_para_variables = ingresos_este_mes - gastos_fijos_este_mes - objetivo_saldo_final_mes_cuenta_corriente
            margen_disponible_para_variables = max(0, margen_disponible_para_variables) 
            factor_ajuste = 1.0
            if total_gastos_variables_original > 0:
                factor_ajuste = margen_disponible_para_variables / total_gastos_variables_original
            factor_ajuste = min(1.0, factor_ajuste) 
            for concepto_var, importe_var_original in gastos_variables_este_mes_lista:
                importe_var_ajustado = round(- (abs(importe_var_original) * factor_ajuste), 2)
                fecha_gasto_var = generar_fecha_aleatoria_mes(ano_actual_sim, mes_idx)
                transacciones.append([fecha_gasto_var.strftime('%Y-%m-%d'), concepto_var, importe_var_ajustado])
    df_final = pd.DataFrame(transacciones, columns=['Fecha', 'Concepto', 'Importe'])
    df_final['Fecha'] = pd.to_datetime(df_final['Fecha']) 
    df_final = df_final.sort_values(by='Fecha') 
    df_final['Fecha'] = df_final['Fecha'].dt.strftime('%Y-%m-%d') 
    df_final['Importe'] = df_final['Importe'].round(2)
    directorio_script = os.path.dirname(os.path.abspath(__file__))
    ruta_completa_salida = os.path.join(directorio_script, nombre_archivo_salida)
    try:
        df_final.to_csv(ruta_completa_salida, index=False, float_format='%.2f')
        print(f"Extracto bancario simulado generado y guardado en: {ruta_completa_salida}")
    except Exception as e:
        print(f"Error al guardar el CSV: {e}")

if __name__ == "__main__":
    generar_extracto_simulado(nombre_archivo_salida="extracto_bancario.csv", 
                            anos_a_generar=10, 
                            objetivo_saldo_final_mes_cuenta_corriente=25.0)

generarcsvsp500.py

import yfinance as yf
import datetime

ticker_symbol = "^GSPC"
end_date = datetime.date.today()
start_date = end_date - datetime.timedelta(days=10*365.25)
data = yf.download(ticker_symbol, start=start_date, end=end_date, interval="1mo")

if not data.empty:
    data_cleaned = data[['Close']].copy()
    data_cleaned.rename(columns={'Close': 'PrecioCierre'}, inplace=True)
    data_cleaned.index.name = 'Fecha'
    
    nombre_archivo_csv = "sp500_historico.csv"
    data_cleaned.to_csv(nombre_archivo_csv)
    print(f"Datos guardados en {nombre_archivo_csv}")
else:
    print("No se pudieron descargar los datos.")

generarcsvbitcoin.py

import yfinance as yf
import datetime

ticker_symbol_btc = "BTC-USD"
end_date_btc = datetime.date.today()
start_date_btc = end_date_btc - datetime.timedelta(days=10*365.25)
data_btc = yf.download(ticker_symbol_btc, start=start_date_btc, end=end_date_btc, interval="1mo")

if not data_btc.empty:
    data_btc_cleaned = data_btc[['Close']].copy()
    data_btc_cleaned.rename(columns={'Close': 'PrecioCierre'}, inplace=True)
    data_btc_cleaned.index.name = 'Fecha'
    
    nombre_archivo_csv_btc = "bitcoin_historico.csv"
    data_btc_cleaned.to_csv(nombre_archivo_csv_btc)
    print(f"Datos de Bitcoin guardados en {nombre_archivo_csv_btc}")
else:
    print(f"No se pudieron descargar los datos para {ticker_symbol_btc}.")

9.2. Scripts PySpark del Notebook EMR

Celda 1: Configuración y Carga de Datos

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, avg, expr, lit 
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType
import numpy as np

spark = SparkSession.builder.appName("SimuladorInversionMonteCarlo").getOrCreate()

# --- Parámetros ---
meta_financiera, plazo_anios, pct_sp500, pct_bitcoin = 60000.0, 5, 0.60, 0.10
plazo_meses = int(plazo_anios * 12)
pct_cash = 1.0 - pct_sp500 - pct_bitcoin
num_simulaciones = 10000
bucket_name = "mi-simulador-inversion-pablo-vidal" 

# --- Rutas S3 (Concatenadas) ---
s3_base = f"s3a://{bucket_name}"
path_sp500 = f"{s3_base}/datos-mercado/sp500_historico.csv"
path_bitcoin = f"{s3_base}/datos-mercado/bitcoin_historico.csv"
path_ahorro_usuario = f"{s3_base}/procesado-glue/aporte_simulacion_base/" 
path_params_mercado = f"{s3_base}/procesado-glue/parametros_mercado/" 

path_out_dist = f"{s3_base}/procesado-emr/resultados_simulacion_distribucion/"
path_out_median = f"{s3_base}/procesado-emr/resultados_simulacion_proyeccion_mediana/"
path_out_summary = f"{s3_base}/procesado-emr/sumario_simulacion/"

print(f"Spark: OK. Meta: {meta_financiera}, Plazo: {plazo_meses}m, S&P500:{pct_sp500:.0%}, BTC:{pct_bitcoin:.0%}, Cash:{pct_cash:.0%}")

try:
    print(f"Intentando cargar parámetros de mercado desde Glue: {path_params_mercado}")
    df_params_glue = spark.read.option("header", "true").option("inferSchema", "true").csv(path_params_mercado)
    
    params_sp500_glue = df_params_glue.filter(col("Activo") == "SP500").first() # Asegúrate que el valor es "SP500" y no "S&P500"
    mu_sp500 = params_sp500_glue["RetornoPromedioMensual_mu"]
    sigma_sp500 = params_sp500_glue["VolatilidadMensual_sigma"]
    print(f"S&P500 (desde Glue): μ={mu_sp500:.4%}, σ={sigma_sp500:.4%}")

    params_bitcoin_glue = df_params_glue.filter(col("Activo") == "Bitcoin").first()
    mu_bitcoin = params_bitcoin_glue["RetornoPromedioMensual_mu"]
    sigma_bitcoin = params_bitcoin_glue["VolatilidadMensual_sigma"]
    print(f"Bitcoin (desde Glue): μ={mu_bitcoin:.4%}, σ={sigma_bitcoin:.4%}")

except Exception as e_glue_params:
    print(f"No se pudieron cargar parámetros de mercado desde Glue ({e_glue_params}). Calculando directamente...")
    def get_market_params(path, name):
        df = spark.read.csv(path, header=True, inferSchema=True)
        df = df.withColumn("PrecioCierre", col("PrecioCierre").cast(DoubleType())) \
               .withColumn("Fecha", col("Fecha").cast("date")) 
        df = df.withColumn("RetornoMensual", (col("PrecioCierre") - lag("PrecioCierre", 1).over(Window.orderBy("Fecha"))) / lag("PrecioCierre", 1).over(Window.orderBy("Fecha")))
        stats = df.na.drop(subset=["RetornoMensual"]).select(avg("RetornoMensual").alias("mu"), expr("stddev_samp(RetornoMensual)").alias("sigma")).first()
        if not stats or stats["mu"] is None or stats["sigma"] is None: raise ValueError(f"Params NA for {name}")
        print(f"{name} (Calculado): μ={stats['mu']:.4%}, σ={stats['sigma']:.4%}")
        return stats["mu"], stats["sigma"]
    mu_sp500, sigma_sp500 = get_market_params(path_sp500, "S&P500")
    mu_bitcoin, sigma_bitcoin = get_market_params(path_bitcoin, "Bitcoin")

try:
    ahorro_df = spark.read.option("header","true").option("inferSchema","true").csv(path_ahorro_usuario)
    ahorro_mensual_usuario = ahorro_df.first()["AporteMensualBaseParaInversion"]
    print(f"Ahorro Usuario: {ahorro_mensual_usuario:.2f}")
except Exception as e_ahorro:
    print(f"Error cargando ahorro del usuario: {e_ahorro}. Revisar S3 y job de Glue (aporte_simulacion_base).")
    raise

Celda 2: Función de Simulación y Ejecución

import numpy as np 

def run_simulation_trajectory(sim_id): 
    local_mu_sp500, local_sigma_sp500 = mu_sp500, sigma_sp500
    local_mu_bitcoin, local_sigma_bitcoin = mu_bitcoin, sigma_bitcoin
    local_ahorro_mensual = ahorro_mensual_usuario
    local_plazo_meses = plazo_meses
    local_pct_sp500, local_pct_bitcoin, local_pct_cash = pct_sp500, pct_bitcoin, pct_cash

    cap_sp, cap_btc, cap_cash = 0.0, 0.0, 0.0
    trajectory = []
    for month in range(1, int(local_plazo_meses) + 1): 
        ret_sp = np.random.normal(local_mu_sp500, local_sigma_sp500)
        ret_btc = np.random.normal(local_mu_bitcoin, local_sigma_bitcoin)
        
        cap_sp = max(0, (cap_sp + local_ahorro_mensual * local_pct_sp500) * (1 + ret_sp))
        cap_btc = max(0, (cap_btc + local_ahorro_mensual * local_pct_bitcoin) * (1 + ret_btc))
        cap_cash += local_ahorro_mensual * local_pct_cash
        trajectory.append((sim_id, month, cap_cash, cap_sp, cap_btc, cap_cash + cap_sp + cap_btc))
    return (cap_cash + cap_sp + cap_btc, trajectory) 

simulation_ids_rdd = spark.sparkContext.parallelize(range(num_simulaciones), numSlices=100) 
resultados_rdd = simulation_ids_rdd.map(run_simulation_trajectory).cache()
df_final_capitals = resultados_rdd.map(lambda x: (x[0],)).toDF(["CapitalFinalSimulacion"])

print(f"Simulación de {num_simulaciones} trayectorias completada.")
df_final_capitals.show(5)

Celda 3: Análisis y Guardado de Resultados

from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.sql.functions import col, abs as _abs, expr 

prob_exito = df_final_capitals.filter(col("CapitalFinalSimulacion") >= meta_financiera).count() / float(num_simulaciones) 
print(f"Probabilidad de alcanzar meta ({meta_financiera}): {prob_exito:.2%}")

median_capital = df_final_capitals.selectExpr(f"percentile_approx(CapitalFinalSimulacion, 0.5)").first()[0]
median_trajectory_data_list = resultados_rdd.min(key=lambda x: abs(x[0] - median_capital))[1] 

schema_trajectory = StructType([
    StructField("SimID", IntegerType()), StructField("Mes", IntegerType()),
    StructField("Capital_Cash", DoubleType()), StructField("Capital_SP500", DoubleType()),
    StructField("Capital_Bitcoin", DoubleType()), StructField("Capital_Total_Mes", DoubleType())
])
df_median_trajectory = spark.createDataFrame(median_trajectory_data_list, schema_trajectory)
print(f"Mediana Capital: {median_capital:.2f}. Trayectoria mediana (primeros 5 meses):")
df_median_trajectory.show(5)

df_summary = spark.createDataFrame(
    [(meta_financiera, float(plazo_meses), pct_sp500, pct_bitcoin, pct_cash, prob_exito, float(num_simulaciones), median_capital)],
    ["MetaFinanciera", "PlazoMeses", "Pct_SP500", "Pct_Bitcoin", "Pct_Cash", "ProbabilidadExito", "NumSimulaciones", "CapitalMedianoP50"]
)
print("Sumario de la Simulación:")
df_summary.show(truncate=False)

df_final_capitals.coalesce(1).write.mode("overwrite").option("header", "true").csv(path_out_dist)
df_median_trajectory.coalesce(1).write.mode("overwrite").option("header", "true").csv(path_out_median)
df_summary.coalesce(1).write.mode("overwrite").option("header", "true").csv(path_out_summary)
print(f"Resultados guardados en S3 en las carpetas de: {path_out_dist}, {path_out_median}, {path_out_summary}")

9.3. Scripts SQL de Transformación en AWS Glue

Job Extracto_Bancario:

Transformar_Extracto_Detallado
WITH TransaccionesConFecha AS (
    SELECT
        to_date(fecha, 'yyyy-MM-dd') AS FechaTransaccion,
        LOWER(concepto) as concepto_lower,
        CAST(importe AS DOUBLE) AS ImporteNumerico
    FROM myDataSource 
),
CategorizacionDetallada AS (
    SELECT
        YEAR(FechaTransaccion) AS Anio,
        MONTH(FechaTransaccion) AS Mes,
        ImporteNumerico,
        concepto_lower,
        CASE 
            WHEN ImporteNumerico > 0 THEN 'Ingreso_Operativo'
            WHEN concepto_lower LIKE '%transferencia%ahorro%' THEN 'Transferencia_Ahorro_Saliente'
            WHEN ImporteNumerico < 0 THEN 'Gasto_Operativo'
            ELSE 'Otro'
        END as TipoPrincipalMovimiento
    FROM TransaccionesConFecha
),
AgregadosPorCategoriaMes AS (
    SELECT
        Anio, Mes,
        SUM(CASE WHEN concepto_lower LIKE '%nomina%' AND ImporteNumerico > 0 THEN ImporteNumerico ELSE 0 END) AS Ing_Nomina,
        SUM(CASE WHEN concepto_lower LIKE '%paga%extra%' AND ImporteNumerico > 0 THEN ImporteNumerico ELSE 0 END) AS Ing_PagaExtra,
        SUM(CASE WHEN concepto_lower LIKE '%ingreso%extraordinario%' AND ImporteNumerico > 0 THEN ImporteNumerico ELSE 0 END) AS Ing_Extraordinario,
        SUM(CASE WHEN concepto_lower LIKE '%hipoteca%' AND ImporteNumerico < 0 THEN ABS(ImporteNumerico) ELSE 0 END) AS Gasto_Hipoteca,
        SUM(CASE WHEN concepto_lower LIKE '%suministros%' AND ImporteNumerico < 0 THEN ABS(ImporteNumerico) ELSE 0 END) AS Gasto_Suministros,
        SUM(CASE WHEN concepto_lower LIKE '%supermercado%' AND ImporteNumerico < 0 THEN ABS(ImporteNumerico) ELSE 0 END) AS Gasto_Supermercado,
        SUM(CASE WHEN concepto_lower LIKE '%restauracion%' AND ImporteNumerico < 0 THEN ABS(ImporteNumerico) ELSE 0 END) AS Gasto_Restauracion,
        SUM(CASE 
                WHEN ImporteNumerico < 0 AND NOT (concepto_lower LIKE '%hipoteca%') AND NOT (concepto_lower LIKE '%suministros%')
                AND NOT (concepto_lower LIKE '%supermercado%') AND NOT (concepto_lower LIKE '%restauracion%')
                AND NOT (concepto_lower LIKE '%vacaciones%') AND NOT (concepto_lower LIKE '%transferencia%ahorro%')
                THEN ABS(ImporteNumerico) ELSE 0 
            END) AS Gasto_OtrosVarios,
        SUM(CASE WHEN concepto_lower LIKE '%vacaciones%' AND ImporteNumerico < 0 THEN ABS(ImporteNumerico) ELSE 0 END) AS Gasto_Vacaciones,
        SUM(CASE WHEN TipoPrincipalMovimiento = 'Transferencia_Ahorro_Saliente' THEN ABS(ImporteNumerico) ELSE 0 END) AS Monto_TransferenciaAhorro
    FROM CategorizacionDetallada GROUP BY Anio, Mes
)
SELECT
    Anio, Mes,
    COALESCE(Ing_Nomina, 0) AS Ing_Nomina, COALESCE(Ing_PagaExtra, 0) AS Ing_PagaExtra, COALESCE(Ing_Extraordinario, 0) AS Ing_Extraordinario,
    (COALESCE(Ing_Nomina, 0) + COALESCE(Ing_PagaExtra, 0) + COALESCE(Ing_Extraordinario, 0)) AS Total_Ingresos_Mes,
    COALESCE(Gasto_Hipoteca, 0) AS Gasto_Hipoteca, COALESCE(Gasto_Suministros, 0) AS Gasto_Suministros,
    COALESCE(Gasto_Supermercado, 0) AS Gasto_Supermercado, COALESCE(Gasto_Restauracion, 0) AS Gasto_Restauracion,
    COALESCE(Gasto_OtrosVarios, 0) AS Gasto_OtrosVarios, COALESCE(Gasto_Vacaciones, 0) AS Gasto_Vacaciones,
    (COALESCE(Gasto_Hipoteca, 0) + COALESCE(Gasto_Suministros, 0) + COALESCE(Gasto_Supermercado, 0) + COALESCE(Gasto_Restauracion, 0) + COALESCE(Gasto_OtrosVarios, 0) + COALESCE(Gasto_Vacaciones, 0)) AS Total_Gastos_Operativos_Mes,
    COALESCE(Monto_TransferenciaAhorro, 0) AS Monto_TransferenciaAhorro,
    ((COALESCE(Ing_Nomina, 0) + COALESCE(Ing_PagaExtra, 0) + COALESCE(Ing_Extraordinario, 0)) - (COALESCE(Gasto_Hipoteca, 0) + COALESCE(Gasto_Suministros, 0) + COALESCE(Gasto_Supermercado, 0) + COALESCE(Gasto_Restauracion, 0) + COALESCE(Gasto_OtrosVarios, 0) + COALESCE(Gasto_Vacaciones, 0))) AS Ahorro_Potencial_Bruto_Mes,
    ((COALESCE(Ing_Nomina, 0) + COALESCE(Ing_PagaExtra, 0) + COALESCE(Ing_Extraordinario, 0)) - (COALESCE(Gasto_Hipoteca, 0) + COALESCE(Gasto_Suministros, 0) + COALESCE(Gasto_Supermercado, 0) + COALESCE(Gasto_Restauracion, 0) + COALESCE(Gasto_OtrosVarios, 0) + COALESCE(Gasto_Vacaciones, 0)) - COALESCE(Monto_TransferenciaAhorro, 0)) AS Saldo_Neto_Cuenta_Corriente_Mes
FROM AgregadosPorCategoriaMes ORDER BY Anio, Mes;
Calcular_Promedios_Gasto
SELECT 'Hipoteca' AS Categoria_Gasto, AVG(Gasto_Hipoteca) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_Hipoteca > 0
UNION ALL
SELECT 'Suministros' AS Categoria_Gasto, AVG(Gasto_Suministros) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_Suministros > 0
UNION ALL
SELECT 'Supermercado' AS Categoria_Gasto, AVG(Gasto_Supermercado) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_Supermercado > 0
UNION ALL
SELECT 'Restauracion' AS Categoria_Gasto, AVG(Gasto_Restauracion) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_Restauracion > 0
UNION ALL
SELECT 'OtrosVarios' AS Categoria_Gasto, AVG(Gasto_OtrosVarios) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_OtrosVarios > 0
UNION ALL
SELECT 'Vacaciones' AS Categoria_Gasto, AVG(Gasto_Vacaciones) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Gasto_Vacaciones > 0
UNION ALL
SELECT 'TransferenciaAhorro' AS Categoria_Gasto, AVG(Monto_TransferenciaAhorro) AS Promedio_Mensual_Gasto FROM myDataSource WHERE Monto_TransferenciaAhorro > 0;
Calcular_Aporte_Base_Simulacion
SELECT AVG(Monto_TransferenciaAhorro) AS AporteMensualBaseParaInversion
FROM myDataSource 
WHERE Monto_TransferenciaAhorro > 0;

Job Job_Procesar_Datos_Mercado:

SQL_Calc_Retornos_SP500
WITH PreciosOrdenados AS (
    SELECT to_date(col0, 'yyyy-MM-dd') AS FechaPrecio, CAST(col1 AS DOUBLE) AS Precio 
    FROM myDataSource ORDER BY FechaPrecio ASC
), PreciosConAnterior AS (
    SELECT FechaPrecio, Precio, LAG(Precio, 1, NULL) OVER (ORDER BY FechaPrecio ASC) AS PrecioAnterior
    FROM PreciosOrdenados
), RetornosMensuales AS (
    SELECT FechaPrecio, Precio, PrecioAnterior,
           CASE WHEN PrecioAnterior IS NOT NULL AND PrecioAnterior != 0 THEN (Precio - PrecioAnterior) / PrecioAnterior ELSE NULL END AS RetornoMensual
    FROM PreciosConAnterior WHERE PrecioAnterior IS NOT NULL
)
SELECT 'SP500' as Activo, FechaPrecio, RetornoMensual FROM RetornosMensuales WHERE RetornoMensual IS NOT NULL;
SQL_Calc_Retornos_Bitcoin
WITH PreciosOrdenados AS (
    SELECT to_date(col0, 'yyyy-MM-dd') AS FechaPrecio, CAST(col1 AS DOUBLE) AS Precio 
    FROM myDataSource ORDER BY FechaPrecio ASC
), PreciosConAnterior AS (
    SELECT FechaPrecio, Precio, LAG(Precio, 1, NULL) OVER (ORDER BY FechaPrecio ASC) AS PrecioAnterior
    FROM PreciosOrdenados
), RetornosMensuales AS (
    SELECT FechaPrecio, Precio, PrecioAnterior,
           CASE WHEN PrecioAnterior IS NOT NULL AND PrecioAnterior != 0 THEN (Precio - PrecioAnterior) / PrecioAnterior ELSE NULL END AS RetornoMensual
    FROM PreciosConAnterior WHERE PrecioAnterior IS NOT NULL
)
SELECT 'Bitcoin' as Activo, FechaPrecio, RetornoMensual FROM RetornosMensuales WHERE RetornoMensual IS NOT NULL;
SQL_Union_Retornos
SELECT Activo, FechaPrecio, RetornoMensual FROM myDataSourceSP500
UNION ALL
SELECT Activo, FechaPrecio, RetornoMensual FROM myDataSourceBitcoin;
SQL_Calc_Estadisticas_Mercado
SELECT
    Activo,
    AVG(RetornoMensual) AS RetornoPromedioMensual_mu,
    STDDEV_SAMP(RetornoMensual) AS VolatilidadMensual_sigma
FROM myDataSource 
GROUP BY Activo;